# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=C,R,W
""" Superset wrapper around pandas.DataFrame.

TODO(bkyryliuk): add support for the conventions like: *_dim or dim_*
                 dimensions, *_ts, ts_*, ds_*, *_ds - datetime, etc.
TODO(bkyryliuk): recognize integer encoded enums.

"""
import logging
from datetime import date, datetime

import numpy as np
import pandas as pd
from pandas.core.common import maybe_box_datetimelike
from pandas.core.dtypes.dtypes import ExtensionDtype

from superset.utils.core import JS_MAX_INTEGER

INFER_COL_TYPES_THRESHOLD = 95
INFER_COL_TYPES_SAMPLE_SIZE = 100


def dedup(l, suffix="__", case_sensitive=True):
    """De-duplicates a list of string by suffixing a counter

    Always returns the same number of entries as provided, and always returns
    unique values. Case sensitive comparison by default.

    >>> print(','.join(dedup(['foo', 'bar', 'bar', 'bar', 'Bar'])))
    foo,bar,bar__1,bar__2,Bar
    >>> print(
        ','.join(dedup(['foo', 'bar', 'bar', 'bar', 'Bar'], case_sensitive=False))
    )
    foo,bar,bar__1,bar__2,Bar__3
    """
    new_l = []
    seen = {}
    for s in l:
        s_fixed_case = s if case_sensitive else s.lower()
        if s_fixed_case in seen:
            seen[s_fixed_case] += 1
            s += suffix + str(seen[s_fixed_case])
        else:
            seen[s_fixed_case] = 0
        new_l.append(s)
    return new_l


def is_numeric(dtype):
    if hasattr(dtype, "_is_numeric"):
        return dtype._is_numeric
    return np.issubdtype(dtype, np.number)


class SupersetDataFrame(object):
    # Mapping numpy dtype.char to generic database types
    type_map = {
        "b": "BOOL",  # boolean
        "i": "INT",  # (signed) integer
        "u": "INT",  # unsigned integer
        "l": "INT",  # 64bit integer
        "f": "FLOAT",  # floating-point
        "c": "FLOAT",  # complex-floating point
        "m": None,  # timedelta
        "M": "DATETIME",  # datetime
        "O": "OBJECT",  # (Python) objects
        "S": "BYTE",  # (byte-)string
        "U": "STRING",  # Unicode
        "V": None,  # raw data (void)
    }

    def __init__(self, data, cursor_description, db_engine_spec):
        data = data or []

        column_names = []
        dtype = None
        if cursor_description:
            # get deduped list of column names
            column_names = dedup([col[0] for col in cursor_description])

            # fix cursor descriptor with the deduped names
            cursor_description = [
                tuple([column_name, *list(description)[1:]])
                for column_name, description in zip(column_names, cursor_description)
            ]

            # get type for better type casting, if possible
            dtype = db_engine_spec.get_pandas_dtype(cursor_description)

        self.column_names = column_names

        if dtype:
            # put data in a 2D array so we can efficiently access each column;
            # the reshape ensures the shape is 2D in case data is empty
            array = np.array(data, dtype="object").reshape(-1, len(column_names))
            # convert each column in data into a Series of the proper dtype; we
            # need to do this because we can not specify a mixed dtype when
            # instantiating the DataFrame, and this allows us to have different
            # dtypes for each column.
            data = {
                column: pd.Series(array[:, i], dtype=dtype[column])
                for i, column in enumerate(column_names)
            }
            self.df = pd.DataFrame(data, columns=column_names)
        else:
            self.df = pd.DataFrame(list(data), columns=column_names).infer_objects()

        self._type_dict = {}
        try:
            # The driver may not be passing a cursor.description
            self._type_dict = {
                col: db_engine_spec.get_datatype(cursor_description[i][1])
                for i, col in enumerate(column_names)
                if cursor_description
            }
        except Exception as e:
            logging.exception(e)

    @property
    def raw_df(self):
        return self.df

    @property
    def size(self):
        return len(self.df.index)

    @property
    def data(self):
        return self.format_data(self.df)

    @classmethod
    def format_data(cls, df):
        # work around for https://github.com/pandas-dev/pandas/issues/18372
        data = [
            dict(
                (k, maybe_box_datetimelike(v))
                for k, v in zip(df.columns, np.atleast_1d(row))
            )
            for row in df.values
        ]
        for d in data:
            for k, v in list(d.items()):
                # if an int is too big for Java Script to handle
                # convert it to a string
                if isinstance(v, int):
                    if abs(v) > JS_MAX_INTEGER:
                        d[k] = str(v)
        return data

    @classmethod
    def db_type(cls, dtype):
        """Given a numpy dtype, Returns a generic database type"""
        if isinstance(dtype, ExtensionDtype):
            return cls.type_map.get(dtype.kind)
        elif hasattr(dtype, "char"):
            return cls.type_map.get(dtype.char)

    @classmethod
    def datetime_conversion_rate(cls, data_series):
        success = 0
        total = 0
        for value in data_series:
            total += 1
            try:
                pd.to_datetime(value)
                success += 1
            except Exception:
                continue
        return 100 * success / total

    @staticmethod
    def is_date(np_dtype, db_type_str):
        def looks_daty(s):
            if isinstance(s, str):
                return any([s.lower().startswith(ss) for ss in ("time", "date")])
            return False

        if looks_daty(db_type_str):
            return True
        if np_dtype and np_dtype.name and looks_daty(np_dtype.name):
            return True
        return False

    @classmethod
    def is_dimension(cls, dtype, column_name):
        if cls.is_id(column_name):
            return False
        return dtype.name in ("object", "bool")

    @classmethod
    def is_id(cls, column_name):
        return column_name.startswith("id") or column_name.endswith("id")

    @classmethod
    def agg_func(cls, dtype, column_name):
        # consider checking for key substring too.
        if cls.is_id(column_name):
            return "count_distinct"
        if (
            hasattr(dtype, "type")
            and issubclass(dtype.type, np.generic)
            and is_numeric(dtype)
        ):
            return "sum"
        return None

    @property
    def columns(self):
        """Provides metadata about columns for data visualization.

        :return: dict, with the fields name, type, is_date, is_dim and agg.
        """
        if self.df.empty:
            return None

        columns = []
        sample_size = min(INFER_COL_TYPES_SAMPLE_SIZE, len(self.df.index))
        sample = self.df
        if sample_size:
            sample = self.df.sample(sample_size)
        for col in self.df.dtypes.keys():
            db_type_str = self._type_dict.get(col) or self.db_type(self.df.dtypes[col])
            column = {
                "name": col,
                "agg": self.agg_func(self.df.dtypes[col], col),
                "type": db_type_str,
                "is_date": self.is_date(self.df.dtypes[col], db_type_str),
                "is_dim": self.is_dimension(self.df.dtypes[col], col),
            }

            if not db_type_str or db_type_str.upper() == "OBJECT":
                v = sample[col].iloc[0] if not sample[col].empty else None
                if isinstance(v, str):
                    column["type"] = "STRING"
                elif isinstance(v, int):
                    column["type"] = "INT"
                elif isinstance(v, float):
                    column["type"] = "FLOAT"
                elif isinstance(v, (datetime, date)):
                    column["type"] = "DATETIME"
                    column["is_date"] = True
                    column["is_dim"] = False
                # check if encoded datetime
                if (
                    column["type"] == "STRING"
                    and self.datetime_conversion_rate(sample[col])
                    > INFER_COL_TYPES_THRESHOLD
                ):
                    column.update({"is_date": True, "is_dim": False, "agg": None})
            # 'agg' is optional attribute
            if not column["agg"]:
                column.pop("agg", None)
            columns.append(column)
        return columns
